Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26758][core]Idle Executors are not getting killed after spark.dynamiAllocation.executorIdleTimeout value #23697

Closed
wants to merge 2 commits into from

Conversation

sandeep-katta
Copy link
Contributor

What changes were proposed in this pull request?

updateAndSyncNumExecutorsTarget API should be called after initializing flag is unset

How was this patch tested?

Added UT and also manually tested

After Fix
afterfix

@srowen
Copy link
Member

srowen commented Jan 30, 2019

I get the idea, but I note that the current logic seems to be on purpose according to the comments. The comment would have to be updated too.

So the scenario here is that no stages have been submitted, but the first scheduled check at 60s won't kill the idle executors. The second one at 120s will right? Yeah that's a minor thing, but worth trying to fix.

This change has some other effects like killing idle executors before deciding whether more are needed. Maybe that causes it to kill and then recreate an executor to match a minimum, which isn't great, but, at the same time, currently it will check whether it needs to add executors and then kill expired ones right after, which also seems odd.

Another possible fix is simply to set initializing to false in the second call to schedule() (the first one that happens after the initial one at time 0) but maybe that's also kind of complex to reason about.

(By the way you could also delete totalRunningTasks while here as it's not used, but not important.)

@sandeep-katta
Copy link
Contributor Author

I get the idea, but I note that the current logic seems to be on purpose according to the comments. The comment would have to be updated too.

So the scenario here is that no stages have been submitted, but the first scheduled check at 60s won't kill the idle executors. The second one at 120s will right? Yeah that's a minor thing, but worth trying to fix.

This change has some other effects like killing idle executors before deciding whether more are needed. Maybe that causes it to kill and then recreate an executor to match a minimum, which isn't great, but, at the same time, currently it will check whether it needs to add executors and then kill expired ones right after, which also seems odd.

Another possible fix is simply to set initializing to false in the second call to schedule() (the first one that happens after the initial one at time 0) but maybe that's also kind of complex to reason about.

(By the way you could also delete totalRunningTasks while here as it's not used, but not important.)

After the first scheduled check at 60s executors will be filtered from the list, code here https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L318 . So at 2nd run this removeTimes will be empty

This change only makes sure initializing flag is used properly, rest of the logic is same.

Yes ofCourse I can delete totalRunningTasks

@srowen
Copy link
Member

srowen commented Jan 30, 2019

Just to confirm, they won't be removed at 60s but will be removed at 120s, but of course we expect them to be removed at 60s, right?

Yeah that was kind of my point, can we just set initializing = true somewhere else to fix this? That's not hard either, but I kind of think this change is an OK way to do it for other reasons (i.e. I don't quite get why you'd remove idle executors right after checking if the number of executors is right).

@sandeep-katta
Copy link
Contributor Author

sandeep-katta commented Jan 30, 2019

Just to confirm, they won't be removed at 60s but will be removed at 120s, but of course we expect them to be removed at 60s, right?

Yeah that was kind of my point, can we just set initializing = true somewhere else to fix this? That's not hard either, but I kind of think this change is an OK way to do it for other reasons (i.e. I don't quite get why you'd remove idle executors right after checking if the number of executors is right).

No they wont be removed at 120s also, removeTimes is the one which holds the executors Id with timestamp, at 60s all the executors are expired and it is cleared from removeTimes map.So at 120s this Map is empty,so executors will not be removed

May be if you look at the driver logs below you can get the point why I am checking target executors before removing idle executors

2019-01-30 14:16:39,134 | INFO | spark-dynamic-executor-allocation | Request to remove executorIds: 2, 1 | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-01-30 14:16:39,135 | DEBUG | spark-dynamic-executor-allocation | Not removing idle executor 2 because there are only 3 executor(s) left (number of executor target 3) | org.apache.spark.internal.Logging$class.logDebug(Logging.scala:58)

Since initializing flag was still true updateAndSyncNumExecutorsTarget is not resetting the numExecutorsTarget to the actual executors needed

Or I can do 1 more fix.

call scheduleTask with some delay like below
executor.scheduleWithFixedDelay(scheduleTask, intervalMillis, intervalMillis, TimeUnit.MILLISECONDS)

So the 1st call to schedule comes at 60s, then set initializing to false at the start of this function,then this problem will be solved

@srowen
Copy link
Member

srowen commented Jan 30, 2019

Hm, isn't schedule() called at 0s, 60s, and 120s here? after 60s, initializing is false (because it checked for expired executors), but it was true when checking whether to update the executors just before that in schedule() so nothing happened.

Is the current problem that they should be checked at 60s, but aren't actually checked until 120s? just to be clear. Because you say you're checking the target before removing executors, but the change makes it happen after.

Yes, your second option is also what I was thinking, but I don't know the implications of removing the call to schedule() at 0s. Your current change feels OK as it seems to improve the logic otherwise.

@sandeep-katta
Copy link
Contributor Author

sandeep-katta commented Jan 30, 2019

current problem is executors are never cleaned even after 60s,120s,180s (assuming any stage is not submitted)

Reason is at 60s removeTimes will retain only those executors which are not expired,since all the executors are expired all the entities are removed from this Map,so at 120s this map will be empty and removeExecutors is not called

@srowen
Copy link
Member

srowen commented Jan 30, 2019

Ah, I'm understanding better now. So, is the issue that an executor that has never gotten any task is never recorded in removeTimes, so is never removed? Looks like executors are marked as idle initially in onExecutorAdded, which ought to happen even before any task is scheduled.

Hm, but start() allocates the initial executors, and I'd assume those result in calling onExecutorAdded():

executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)

And onExecutorIdle() does initially mark it as idle and adds it to removeTimes.

I wonder if the problem is somewhere else in there? maybe you know as you've looked into it more. I'm still not clear why it wouldn't get removed later, even if this fix does seem to improve the logic.

@SparkQA
Copy link

SparkQA commented Jan 30, 2019

Test build #4538 has finished for PR 23697 at commit fa06afa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liupc
Copy link

liupc commented Jan 31, 2019

@sandeep-katta
For my understanding, the executor allocation manager is to manage the executors according to the task we need, so if there are no stage submitted, why should it start to do real work(for instance, removing idle executors as you mentioned above)?

Suppose we have set dynamic allocation target as minExecutor < initialExecutors < maxExecutors, then if initialExecutors is what we expected at starting, why we should idle executors?
If we do the executor number may not be the initialExecutors when a stage is submitted, and it's very likely that the stage will need at least initialExecutors(for it's the initial target set by user), so the useless reallocation will happen. The reallocation may take some while and delay the overall stage execution time.

So maybe I think what you describe is not a problem?

@srowen
Copy link
Member

srowen commented Jan 31, 2019

@liupc the issue is that it doesn't decrease -- if this is correct, if no work is submitted, it never drops to minExecutors. This change doesn't make it expire executors immediately or anything. I do think it's a real problem: it is realistic to imagine running an interactive session with dynamic allocation and a 0 minimum, and maybe you don't actually start executing Spark jobs for a while or step away. Not removing those executors at all sounds surprising doesn't it?

@liupc
Copy link

liupc commented Jan 31, 2019

@srowen Got it, thanks! it sounds reasonable for interactive scenarios. But we may should make the initialTarget definition clear,whether it's mean the initial executors we should provide when stage begin or intial exectuors just at starting. How we can fix this may depend on the definition.

@sandeep-katta
Copy link
Contributor Author

Let me make you understand the problem

Confs used spark.dynamicAllocation.initialExecutors=3,spark.dynamicAllocation.minExecutors=0,spark.dynamicAllocation.executorIdleTimeout=60s

Let say now the time 00:00:00 , and 3 executors are up

schedule will run at 00:00:00,00:01:00,00:02:00 etc..

At 00:00:00
removeTimes contains {{1,00:00:00},{2,00:00:00},{3,00:00:00}}
executorIdsToBeRemoved is empty
At 00:01:00
All the executors are expired
removeTimes will be empty as per retain logic (code here )
executorIdsToBeRemoved = {1,2,3}
but these executors are not removed,can check the jira for the driver logs
At 00:02:00
removeTimes is empty,so executorIdsToBeRemoved is empty

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thank you, I see it now. That's clearly a bug, as it intends to update the numExecutorsTarget before removing (according to comments) but that's not what happens. I also agree that the test is wrong as it asserts that the target is not updated after the first call to schedule.

@SparkQA
Copy link

SparkQA commented Feb 2, 2019

Test build #4544 has finished for PR 23697 at commit 9621464.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Feb 2, 2019

Ah sorry @sandeep-katta that 'unused' private method is used by reflection in the tests. I think that's confusing and maybe we should clean this up later to expose those as private[spark] and just use them directly, but for here you can just revert that change.

@sandeep-katta
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 4, 2019

Test build #4548 has finished for PR 23697 at commit f12bb98.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

srowen pushed a commit that referenced this pull request Feb 5, 2019
….dynamiAllocation.executorIdleTimeout value

## What changes were proposed in this pull request?

**updateAndSyncNumExecutorsTarget**  API should be called after **initializing** flag is unset
## How was this patch tested?
Added UT and also manually tested

After Fix
![afterfix](https://user-images.githubusercontent.com/35216143/51983136-ed4a5000-24bd-11e9-90c8-c4a562c17a4b.png)

Closes #23697 from sandeep-katta/executorIssue.

Authored-by: sandeep-katta <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 1dd7419)
Signed-off-by: Sean Owen <[email protected]>
@srowen srowen closed this in 1dd7419 Feb 5, 2019
srowen pushed a commit that referenced this pull request Feb 5, 2019
….dynamiAllocation.executorIdleTimeout value

## What changes were proposed in this pull request?

**updateAndSyncNumExecutorsTarget**  API should be called after **initializing** flag is unset
## How was this patch tested?
Added UT and also manually tested

After Fix
![afterfix](https://user-images.githubusercontent.com/35216143/51983136-ed4a5000-24bd-11e9-90c8-c4a562c17a4b.png)

Closes #23697 from sandeep-katta/executorIssue.

Authored-by: sandeep-katta <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 1dd7419)
Signed-off-by: Sean Owen <[email protected]>
@srowen
Copy link
Member

srowen commented Feb 5, 2019

Merged to master/2.4/2.3

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
….dynamiAllocation.executorIdleTimeout value

## What changes were proposed in this pull request?

**updateAndSyncNumExecutorsTarget**  API should be called after **initializing** flag is unset
## How was this patch tested?
Added UT and also manually tested

After Fix
![afterfix](https://user-images.githubusercontent.com/35216143/51983136-ed4a5000-24bd-11e9-90c8-c4a562c17a4b.png)

Closes apache#23697 from sandeep-katta/executorIssue.

Authored-by: sandeep-katta <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
….dynamiAllocation.executorIdleTimeout value

## What changes were proposed in this pull request?

**updateAndSyncNumExecutorsTarget**  API should be called after **initializing** flag is unset
## How was this patch tested?
Added UT and also manually tested

After Fix
![afterfix](https://user-images.githubusercontent.com/35216143/51983136-ed4a5000-24bd-11e9-90c8-c4a562c17a4b.png)

Closes apache#23697 from sandeep-katta/executorIssue.

Authored-by: sandeep-katta <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 1dd7419)
Signed-off-by: Sean Owen <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 25, 2019
….dynamiAllocation.executorIdleTimeout value

## What changes were proposed in this pull request?

**updateAndSyncNumExecutorsTarget**  API should be called after **initializing** flag is unset
## How was this patch tested?
Added UT and also manually tested

After Fix
![afterfix](https://user-images.githubusercontent.com/35216143/51983136-ed4a5000-24bd-11e9-90c8-c4a562c17a4b.png)

Closes apache#23697 from sandeep-katta/executorIssue.

Authored-by: sandeep-katta <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 1dd7419)
Signed-off-by: Sean Owen <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
….dynamiAllocation.executorIdleTimeout value

## What changes were proposed in this pull request?

**updateAndSyncNumExecutorsTarget**  API should be called after **initializing** flag is unset
## How was this patch tested?
Added UT and also manually tested

After Fix
![afterfix](https://user-images.githubusercontent.com/35216143/51983136-ed4a5000-24bd-11e9-90c8-c4a562c17a4b.png)

Closes apache#23697 from sandeep-katta/executorIssue.

Authored-by: sandeep-katta <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 1dd7419)
Signed-off-by: Sean Owen <[email protected]>
@IgorBerman
Copy link

IgorBerman commented Nov 13, 2019

@sandeep-katta @srowen I think there are still cases that executor will remain there idle and will be never removed due to "Not removing idle executor .. because there are only ..executor" decision

Maybe there is another problem, but imo above idle executor is never rescheduled for "check" to be killed, so those executors that got timer triggered before target executors reached 0, will stay there. Even though this fix will update target executors to the right value, the idle executor never added back to removeTimes map

(I'm talking about v2.2.3)

wdyt?

@HyukjinKwon
Copy link
Member

2.2.3 is EOL release and there won't be more releases at 2.2.x line. You might have to consider migrating to higher versions.

@sandeep-katta
Copy link
Contributor Author

@IgorBerman as @HyukjinKwon said 2.2.x is EOL, may be if you give me the exact case in 2.4.x or spark3 I can think off the solution

@IgorBerman
Copy link

thanks @sandeep-katta, @HyukjinKwon for the answer
yes, I missed that there were whole refactoring in later version(not sure at which point exactly)
for our internal distro I've reverted #18874 and it solved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants